package com.ocss.ocsscustomerserver.websocket;


import com.alibaba.fastjson.JSON;
import com.ocss.ocsscommon.message.ClientMessage;
import com.ocss.ocsscommon.message.KafkaMessage;
import com.ocss.ocsscommon.message.KafkaVisitorAssignMessage;
import com.ocss.ocsscustomerserver.tools.VerifyTokenUntil;
import com.ocss.ocsscustomerserver.tools.WebsocketRecorder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import java.io.InputStream;
import java.util.Date;

@Component
public class SocketServer  implements WebSocketHandler {

    @Autowired
    VerifyTokenUntil verifyTokenUntil;
    @Autowired
    WebsocketRecorder websocketRecorder;
    @Autowired
    KafkaTemplate kafkaTemplate;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String id = (String) session.getAttributes().get("id");
//        System.out.println(session.getRemoteAddress());
       String Ip = session.getRemoteAddress().getAddress().toString().substring(1);
       Integer port = session.getRemoteAddress().getPort();
       websocketRecorder.putSession(id, session);
        System.out.println("连接成功,会话id是"+session.getId());
//        websocketRecorder.printAllSession();
        KafkaVisitorAssignMessage establishRequest =
                new KafkaVisitorAssignMessage().builder()
                        .commendType(4000)
                        .Visitor(id)
                        .Customer(null)
                        .VisitorSource("bilibili")
                        .VisitorIp(Ip)
                        .VisitorPort(port)
                        .build();
        String sendMessage = JSON.toJSONString(establishRequest);
        kafkaTemplate.send("assignCustomer",sendMessage);

    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        ClientMessage clientMessage = JSON.parseObject(message.getPayload().toString(), ClientMessage.class);
        KafkaMessage messageFormat = KafkaMessage.builder()
                .data(clientMessage.getData())
                .sendId(clientMessage.getFrom())
                .receiveId(clientMessage.getTo())
                .sendTime(new Date())
                .type(clientMessage.getType())
                .playServerId(clientMessage.getTo())
                .messageSource("火星总部")
                .build();
//        Integer integer = messageService.InsertMessage(messageFormat);
//        if(integer != 1){
//            throw new Exception("插入消息数据失败!");
//        }
        String sendMessage = JSON.toJSONString(messageFormat);

        kafkaTemplate.send("visitorSend",sendMessage);
    }



    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {

    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        websocketRecorder.removeBySession(session);
//        websocketRecorder.printAllSession();
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}
